package com.sharding.order.common.mq;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Slf4j
@Component
public class RocketMQProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 可靠消息，确保消息一定刷入broker磁盘中
     * 前提：FLUSH DISK修改为同步刷盘
     * @param topic     消息主题
     * @param message   消息体
     * @param key       key
     * @param consumer  消息发送失败的处理
     */
    public void reliablySend(String topic, Object message, String key, Consumer<Object> consumer){
        try {
            SendResult sendResult = this.send(topic, message, key);
            if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
                // 发送成功，但还未完成FLUSH DISK，此时消息存储于服务器队列（内存），当服务器宕机时，消息就会丢失
                log.warn("reliablySend FLUSH DISK fail,key:[{}]",key);
                consumer.accept(message);
            }
        }catch (Exception e){
            log.error("reliablySend error,key:[{}],e:[{}]",key,e);
            // 消息发送失败的处理
            consumer.accept(message);
        }
    }

    /**
     * 同步发送消息
     **/
    public SendResult send(String topic, Object message, String key) {
        return this.send(topic, null, message, key);
    }

    /**
     * 同步发送消息，带tag
     **/
    public SendResult send(String topic, String tag, Object message, String key) {
        //发送消息
        Message<Object> finalMessage = MessageBuilder.withPayload(message).setHeader("KEYS", key).build();
        String destination = topic;
        if (StringUtils.isNotBlank(tag)) {
            destination = topic + ":" + tag;
        }
        SendResult result = rocketMQTemplate.syncSend(destination, finalMessage);
        log.info("send message ok,key:[{}],message:[{}],sendResult:[{}]", key,message, result);
        return result;
    }


}